/*
 * Copyright (c) 2021 The red-star Project
 *
 * Licensed under the Apache License, version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.inyourcode.core.transport.session;

import com.inyourcode.core.GlobalConstants;
import com.inyourcode.core.cluster.ClusterNodeManager;
import com.inyourcode.core.monitor.MetricsConstants;
import com.inyourcode.core.monitor.MetricsService;
import com.inyourcode.core.util.StackTraceUtil;
import com.inyourcode.core.util.Strings;
import com.inyourcode.core.threads.api.ExecutorFactory;
import com.inyourcode.core.threads.ExecutorFactoryManager;
import com.inyourcode.core.threads.api.HashExecutor;
import com.inyourcode.core.threads.api.ScheduleExecutor;
import com.inyourcode.core.transport.session.api.FixedRequest;
import com.inyourcode.core.transport.session.api.ISessionInterceptor;
import com.inyourcode.core.transport.session.api.MessageHolder;
import com.inyourcode.core.transport.session.api.Session;
import com.inyourcode.core.transport.api.Status;
import com.inyourcode.core.transport.api.channel.JChannel;
import com.inyourcode.core.transport.api.processor.ProviderProcessor;
import com.inyourcode.core.transport.netty.channel.NettyChannel;
import com.inyourcode.core.transport.session.api.AsyncRequest;
import com.inyourcode.core.util.SystemClock;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.TimeUnit;

/**
 * <ul>
 *     消费者抽象的实现
 *     <li>将字节消息转成java对象，然后分发到模块</li>
 *     <li>默认是在netty io线程处理</li>
 *     <li>通过使用注解{@link AsyncRequest} 与 {@link FixedRequest}，可以让将消息投递到指定的线程池</li>
 * </ul>
 *
 * @author JackLei
 **/
public class BasicProviderProcessor implements ProviderProcessor<RequestContext> {
    protected static final Logger LOGGER = LoggerFactory.getLogger(BasicProviderProcessor.class);
    private ISessionInterceptor interceptor;
    private HashExecutor hashExecutor;
    private ScheduleExecutor scheduleExecutor;

    public BasicProviderProcessor() {
        this.hashExecutor = ExecutorFactoryManager.newExecutor(ExecutorFactory.Target.HASH);
        this.scheduleExecutor = ExecutorFactoryManager.newExecutor(ExecutorFactory.Target.SCHEDULE);
        this.interceptor = ISessionInterceptor.DEFALUT;
    }

    @Override
    public void handleRequest(JChannel channel, RequestContext requestContext) throws Exception {
        long timeStamp = System.currentTimeMillis();
        long invokeId = requestContext.getInvokeId();
        byte serializerCode = requestContext.getSerializerCode();
        Session session = SessionService.getSessionByChannel((NettyChannel) channel);
        if (session.getExecutor() == null) {
            session.setExecutor(hashExecutor);
        }

        HandlerWrapper wrapper = SessionHandlerMapping.getHandlerWrapper(invokeId);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("receive request message from client, traceId: {}, invokeId: {}, body: {}",
                    requestContext.getTraceId(), invokeId, Strings.netMsgToJSON(serializerCode, requestContext.getData()));
        }

        if (wrapper == null) {
            LOGGER.error("The wrapper could not be found when traceId: {}, invokeId: {}", requestContext.getTraceId(), invokeId);
            return;
        }

        SessionAcceptTask messageTask = new SessionAcceptTask(session, wrapper, requestContext, timeStamp);
        dispatch(messageTask);
    }

    @Override
    public void handleException(JChannel channel, RequestContext request, Status status, Throwable cause) {
        LOGGER.error("handle exception,channel = {},exception = {}", channel, StackTraceUtil.stackTrace(cause));
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (ClusterNodeManager.getInstance() != null) {
            ClusterNodeManager.getInstance().incNodeLoad();
        }
        Session session = SessionService.create(ctx.channel());
        SessionListenerService.listenOpen(session);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (ClusterNodeManager.getInstance() != null) {
            ClusterNodeManager.getInstance().deIncNodeLoad();
        }
        Session remove = SessionService.remove(ctx.channel());
        SessionListenerService.listenClose(remove);
        //TODO 异常断开资源释放
    }

    protected void dispatch(SessionAcceptTask task) {
        try {
            HandlerWrapper wrapper = task.wrapper;
            Object message = task.requestContext.getData();
            Session session = task.session;
            if (!this.interceptor.intercept(wrapper, message, session)) {
                return;
            }

            if (wrapper.isNoAuthReq()) {
                this.scheduleExecutor.execute(task);
                return;
            }

            if (!session.isAuth()) {
                //TODO close连接
                LOGGER.error("session is forbid, channel = {}", session.channel());
                return;
            }

            if (wrapper.isAsyncReq()) {
                this.scheduleExecutor.execute(task);
                return;
            }

            if (wrapper.isFixedReq()) {
                this.hashExecutor.execute(session.hashCode(), task);
                return;
            }

            hashExecutor.execute(session.hashCode(), task);
        } catch (Exception ex) {
            LOGGER.error("network request processing failed,{}", StackTraceUtil.stackTrace(ex));
        }
    }

    class SessionAcceptTask implements Runnable {
        private Session session;
        private HandlerWrapper wrapper;
        private MessageHolder requestContext;
        private long timeStamp;

        public SessionAcceptTask(Session session, HandlerWrapper wrapper, MessageHolder requestContext, long timeStamp) {
            this.session = session;
            this.wrapper = wrapper;
            this.requestContext = requestContext;
            this.timeStamp = timeStamp;
        }

        @Override
        public void run() {
            int traceId = requestContext.getTraceId();
            byte serializerCode = requestContext.getSerializerCode();
            try {
                Object response = wrapper.invoke(session, requestContext);

                String messageClazzSimpleName = wrapper.getMessageClazz().getSimpleName();
                if (GlobalConstants.METRIC_NEEDED) {
                    MetricsService.getInstance().incCount(MetricsConstants.NAME_REQUEST_COUNT_PREFIX + messageClazzSimpleName);
                    MetricsService.getInstance().timerUpdate( MetricsConstants.NAME_REQUEST_PROCESS_SERVER_TIME + messageClazzSimpleName,SystemClock.millisClock().now() - timeStamp, TimeUnit.MILLISECONDS);
                }

                if (response == null) {
                    return;
                }

                if (!(response instanceof MessageHolder)) {
                    LOGGER.error("Message return type must be MessageHolder, traceId: {}, response: {}, ", traceId, response);
                    return;
                }

                ((MessageHolder) response).setSerializerCode(serializerCode);
                ((MessageHolder) response).setTraceId(requestContext.getTraceId());
                session.write((MessageHolder) response, listener -> {
                    if (listener.isSuccess() && GlobalConstants.METRIC_NEEDED) {
                        MetricsService.getInstance().timerUpdate( MetricsConstants.NAME_REQUEST_PROCESS_ALL_TIME + messageClazzSimpleName,SystemClock.millisClock().now() - requestContext.getTimeStamp(), TimeUnit.MILLISECONDS);
                    }
                });

                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("[{}]send message to client, traceId: {}, invokeId: {}, body: {}",
                            session.getId(), traceId, ((MessageHolder) response).getInvokeId(),
                            Strings.netMsgToJSON(serializerCode, ((MessageHolder) response).getData()));
                }
            } catch (InvocationTargetException e) {
                LOGGER.error("session message invoke failed, id: {}, traceId: {}, exception: {}", session.getId(), traceId, StackTraceUtil.stackTrace(e));
            } catch (IllegalAccessException e) {
                LOGGER.error("session message invoke failed, id: {}, traceId: {}, exception: {}", session.getId(), traceId, StackTraceUtil.stackTrace(e));
            } catch (Exception e) {
                LOGGER.error("session message invoke failed, id: {}, traceId: {}, exception: {}", session.getId(), traceId, StackTraceUtil.stackTrace(e));
            }
        }
    }

}
